/*
 *
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

package net.srt.flink.metadata.hive.driver;

import net.srt.flink.common.assertion.Asserts;
import net.srt.flink.common.model.Column;
import net.srt.flink.common.model.Schema;
import net.srt.flink.common.model.Table;
import net.srt.flink.common.utils.LogUtil;
import net.srt.flink.metadata.base.convert.ITypeConvert;
import net.srt.flink.metadata.base.driver.AbstractJdbcDriver;
import net.srt.flink.metadata.base.driver.Driver;
import net.srt.flink.metadata.base.query.IDBQuery;
import net.srt.flink.metadata.base.result.JdbcSelectResult;
import net.srt.flink.metadata.hive.constant.HiveConstant;
import net.srt.flink.metadata.hive.convert.HiveTypeConvert;
import net.srt.flink.metadata.hive.query.HiveQuery;
import org.apache.commons.lang3.StringUtils;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class HiveDriver extends AbstractJdbcDriver implements Driver {

	@Override
	public Table getTable(String schemaName, String tableName) {
		List<Table> tables = listTables(schemaName);
		Table table = null;
		for (Table item : tables) {
			if (Asserts.isEquals(item.getName(), tableName)) {
				table = item;
				break;
			}
		}
		if (Asserts.isNotNull(table)) {
			table.setColumns(listColumns(schemaName, table.getName()));
		}
		return table;
	}

	@Override
	public List<Table> listTables(String schemaName) {
		List<Table> tableList = new ArrayList<>();
		PreparedStatement preparedStatement = null;
		ResultSet results = null;
		IDBQuery dbQuery = getDBQuery();
		String sql = dbQuery.tablesSql(schemaName);
		try {
			execute(String.format(HiveConstant.USE_DB, schemaName));
			preparedStatement = conn.get().prepareStatement(sql);
			results = preparedStatement.executeQuery();
			ResultSetMetaData metaData = results.getMetaData();
			List<String> columnList = new ArrayList<>();
			for (int i = 1; i <= metaData.getColumnCount(); i++) {
				columnList.add(metaData.getColumnLabel(i));
			}
			while (results.next()) {
				String tableName = results.getString(dbQuery.tableName());
				if (Asserts.isNotNullString(tableName)) {
					Table tableInfo = new Table();
					tableInfo.setName(tableName);
					if (columnList.contains(dbQuery.tableComment())) {
						tableInfo.setComment(results.getString(dbQuery.tableComment()));
					}
					tableInfo.setSchema(schemaName);
					if (columnList.contains(dbQuery.tableType())) {
						tableInfo.setType(results.getString(dbQuery.tableType()));
					}
					if (columnList.contains(dbQuery.catalogName())) {
						tableInfo.setCatalog(results.getString(dbQuery.catalogName()));
					}
					if (columnList.contains(dbQuery.engine())) {
						tableInfo.setEngine(results.getString(dbQuery.engine()));
					}
					tableList.add(tableInfo);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			close(preparedStatement, results);
		}
		return tableList;
	}

	@Override
	public List<Schema> getSchemasAndTables() {
		return listSchemas();
	}

	@Override
	public List<Schema> listSchemas() {

		List<Schema> schemas = new ArrayList<>();
		PreparedStatement preparedStatement = null;
		ResultSet results = null;
		String schemasSql = getDBQuery().schemaAllSql();
		try {
			preparedStatement = conn.get().prepareStatement(schemasSql);
			results = preparedStatement.executeQuery();
			while (results.next()) {
				String schemaName = results.getString(getDBQuery().schemaName());
				if (Asserts.isNotNullString(schemaName)) {
					Schema schema = new Schema(schemaName);
					if (execute(String.format(HiveConstant.USE_DB, schemaName))) {
						schema.setTables(listTables(schema.getName()));
					}
					schemas.add(schema);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			close(preparedStatement, results);
		}
		return schemas;
	}

	@Override
	public List<Column> listColumns(String schemaName, String tableName) {
		List<Column> columns = new ArrayList<>();
		PreparedStatement preparedStatement = null;
		ResultSet results = null;
		IDBQuery dbQuery = getDBQuery();
		String tableFieldsSql = dbQuery.columnsSql(schemaName, tableName);
		try {
			preparedStatement = conn.get().prepareStatement(tableFieldsSql);
			results = preparedStatement.executeQuery();
			ResultSetMetaData metaData = results.getMetaData();
			List<String> columnList = new ArrayList<>();
			for (int i = 1; i <= metaData.getColumnCount(); i++) {
				columnList.add(metaData.getColumnLabel(i));
			}
			Integer positionId = 1;
			while (results.next()) {
				Column field = new Column();
				if (StringUtils.isEmpty(results.getString(dbQuery.columnName()))) {
					break;
				} else {
					if (columnList.contains(dbQuery.columnName())) {
						String columnName = results.getString(dbQuery.columnName());
						field.setName(columnName);
					}
					if (columnList.contains(dbQuery.columnType())) {
						field.setType(results.getString(dbQuery.columnType()));
					}
					if (columnList.contains(dbQuery.columnComment()) && Asserts.isNotNull(results.getString(dbQuery.columnComment()))) {
						String columnComment = results.getString(dbQuery.columnComment()).replaceAll("\"|'", "");
						field.setComment(columnComment);
					}
					field.setPosition(positionId++);
					field.setJavaType(getTypeConvert().convert(field));
				}
				columns.add(field);
			}
		} catch (SQLException e) {
			e.printStackTrace();
		} finally {
			close(preparedStatement, results);
		}
		return columns;
	}

	@Override
	public String getCreateTableSql(Table table) {
		StringBuilder createTable = new StringBuilder();
		PreparedStatement preparedStatement = null;
		ResultSet results = null;
		String createTableSql = getDBQuery().createTableSql(table.getSchema(), table.getName());
		try {
			preparedStatement = conn.get().prepareStatement(createTableSql);
			results = preparedStatement.executeQuery();
			while (results.next()) {
				createTable.append(results.getString(getDBQuery().createTableName())).append("\n");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			close(preparedStatement, results);
		}
		return createTable.toString();
	}

	@Override
	public int executeUpdate(String sql) throws Exception {
		Asserts.checkNullString(sql, "Sql 语句为空");
		String querySQL = sql.trim().replaceAll(";$", "");
		int res = 0;
		try (Statement statement = conn.get().createStatement()) {
			res = statement.executeUpdate(querySQL);
		}
		return res;
	}

	@Override
	public JdbcSelectResult query(String sql, Integer limit) {
		if (Asserts.isNull(limit)) {
			limit = 100;
		}
		JdbcSelectResult result = new JdbcSelectResult();
		List<LinkedHashMap<String, Object>> datas = new ArrayList<>();
		List<Column> columns = new ArrayList<>();
		List<String> columnNameList = new ArrayList<>();
		PreparedStatement preparedStatement = null;
		ResultSet results = null;
		int count = 0;
		try {
			String querySQL = sql.trim().replaceAll(";$", "");
			preparedStatement = conn.get().prepareStatement(querySQL);
			results = preparedStatement.executeQuery();
			if (Asserts.isNull(results)) {
				result.setSuccess(true);
				close(preparedStatement, results);
				return result;
			}
			ResultSetMetaData metaData = results.getMetaData();
			for (int i = 1; i <= metaData.getColumnCount(); i++) {
				columnNameList.add(metaData.getColumnLabel(i));
				Column column = new Column();
				column.setName(metaData.getColumnLabel(i));
				column.setType(metaData.getColumnTypeName(i));
				column.setAutoIncrement(metaData.isAutoIncrement(i));
				column.setNullable(metaData.isNullable(i) == 0 ? false : true);
				column.setJavaType(getTypeConvert().convert(column));
				columns.add(column);
			}
			result.setColumns(columnNameList);
			while (results.next()) {
				LinkedHashMap<String, Object> data = new LinkedHashMap<>();
				for (int i = 0; i < columns.size(); i++) {
					data.put(columns.get(i).getName(), getTypeConvert().convertValue(results, columns.get(i).getName(), columns.get(i).getType()));
				}
				datas.add(data);
				count++;
				if (count >= limit) {
					break;
				}
			}
			result.setSuccess(true);
		} catch (Exception e) {
			result.setError(LogUtil.getError(e));
			result.setSuccess(false);
		} finally {
			close(preparedStatement, results);
			result.setRowData(datas);
			return result;
		}
	}

	@Override
	public IDBQuery getDBQuery() {
		return new HiveQuery();
	}

	@Override
	public ITypeConvert getTypeConvert() {
		return new HiveTypeConvert();
	}

	@Override
	public String getDriverClass() {
		return "org.apache.hive.jdbc.HiveDriver";
	}

	@Override
	public String getType() {
		return "Hive";
	}

	@Override
	public String getName() {
		return "Hive";
	}

	@Override
	public Map<String, String> getFlinkColumnTypeConversion() {
		HashMap<String, String> map = new HashMap<>();
		map.put("BOOLEAN", "BOOLEAN");
		map.put("TINYINT", "TINYINT");
		map.put("SMALLINT", "SMALLINT");
		map.put("INT", "INT");
		map.put("VARCHAR", "STRING");
		map.put("TEXY", "STRING");
		map.put("INT", "INT");
		map.put("DATETIME", "TIMESTAMP");
		return map;
	}
}
